跳到主要内容

WebSocket 心跳机制和重连实现

心跳和重连是什么?

WebSocket 为了保持客户端、服务端的实时双向通信,需要确保客户端、服务端之间的 TCP 通道保持连接没有断开。 在使用 websocket 的过程中,有时候会遇到网络断开的情况,但是在网络断开的时候服务器端并没有触发 onClose 的事件。

这样会有:服务器会继续向客户端发送多余的链接,并且这些数据还会丢失。

所以就需要一种机制来检测客户端和服务端是否处于正常的链接状态。因此就有了 websocket 的心跳了。还有心跳,说明还活着,没有心跳说明已经挂掉了。

例如: 发送方 -> 接收方:ping 接收方 -> 发送方:pong ping、pong 的操作,对应的是 WebSocket 的两个控制帧,opcode 分别是 0x90xA

那重连操作是什么呢?

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了。需要重连~

实现代码

实现心跳检测的思路是:每隔一段固定的时间,向服务器端发送一个 ping 数据,如果在正常的情况下,服务器会返回一个 pong 给客户端,如果客户端通过 onmessage 事件能监听到的话,说明请求正常

这里我们使用了一个定时器,每隔 3 秒的情况下,如果是网络断开的情况下,在指定的时间内服务器端并没有返回心跳响应消息,因此服务器端断开了,因此这个时候我们使用 ws.close 关闭连接,在一段时间后,可以通过 onclose 事件监听到。因此在 onclose 事件内,我们可以调用 reconnect 事件进行重连操作。

发送方 -> 接收方:ping 接收方 -> 发送方:pong ping、pong 的操作,对应的是 WebSocket 的两个控制帧,opcode 分别是 0x90xA

前端部分

export default class ReconnectingWebSocket {
//These can be altered by calling code
public debug: boolean = false;

//Time to wait before attempting reconnect (after close)
public reconnectInterval: number = 1000;
//Time to wait for WebSocket to open (before aborting and retrying)
public timeoutInterval: number = 2000;

//Should only be used to read WebSocket readyState
public readyState: number;

//Whether WebSocket was forced to close by this client
private forcedClose: boolean = false;
//Whether WebSocket opening timed out
private timedOut: boolean = false;

//List of WebSocket sub-protocols
private protocols: string[] = [];

//The underlying WebSocket
private ws: WebSocket | null;
private url: string;

/**
* Setting this to true is the equivalent of setting all instances of ReconnectingWebSocket.debug to true.
*/
public static debugAll = false;

//Set up the default 'noop' event handlers
public onopen: (ev: Event) => void = (event: Event) => null;
public onclose: (ev: CloseEvent) => void = (event: CloseEvent) => null;
public onconnecting: () => void = () => null;
public onmessage: (ev: MessageEvent) => void = (event: MessageEvent) => null;
public onerror: (ev: ErrorEvent) => void = (event: ErrorEvent) => null;

/**
* 创建一个带有心跳和重连机制的 WebSocket 客户端
*
* @param url url
* @param protocols 指定子协议(就是携带一个自带的头) 参考资料 https://segmentfault.com/q/1010000022700936
*/
constructor(url: string, protocols: string[] = []) {
this.url = url;
this.protocols = protocols;
this.readyState = WebSocket.CONNECTING;
this.ws = null;
this.connect(false);
}

/**
* 连接
* @param reconnectAttempt 是否重连
*/
public connect(reconnectAttempt: boolean): void {
this.ws = new WebSocket(this.url, this.protocols);
// 执行自定义的重连操作
this.onconnecting();
this.log('ReconnectingWebSocket', 'attempt-connect', this.url);

const localWs = this.ws;

// 这里是连接超时时会执行的方法(如果连接成功则会执行 clearTimeout(timeout) 方法来避免执行回调函数)
// 这里的超时操作只是用来判断这个 connect 方法有没有执行成功的,与心跳无关
const timeout = setTimeout(() => {
this.log('ReconnectingWebSocket', 'connection-timeout', this.url);
this.timedOut = true;
// 超时关闭
localWs.close();
this.timedOut = false;
}, this.timeoutInterval);

this.ws.onopen = (event: Event) => {
clearTimeout(timeout);
this.log('ReconnectingWebSocket', 'onopen', this.url);
this.readyState = WebSocket.OPEN;
reconnectAttempt = false;
// 开启一个心跳检测
this.startHeartCheck(this.ws as WebSocket);
this.onopen(event);
};

/**
* 别弄混了,当调用了 ws.close() 方法后会回调这个 onclose 方法
*/
this.ws.onclose = (event: CloseEvent) => {
clearTimeout(timeout);
this.ws = null;
if (this.forcedClose) {
this.readyState = WebSocket.CLOSED;
this.onclose(event);
}
// 如果不是强制关闭,还有可能能重连
else {
// 超时了,重新连接
this.readyState = WebSocket.CONNECTING;
// 执行自定义的重连操作
this.onconnecting();
// 如果没有开启重试或超时,则执行自定义的 onclose 事件
if (!reconnectAttempt && !this.timedOut) {
this.log('ReconnectingWebSocket', 'onclose', this.url);
this.onclose(event);
}
setTimeout(() => {
this.connect(true);
}, this.reconnectInterval);
}
};

this.ws.onmessage = (event) => {
this.log('ReconnectingWebSocket', 'onmessage', this.url, event.data);
// 拿到任何消息都说明当前连接是正常的,重置心跳检查(就是靠这个 onmessage 维系心跳检测的)
this.startHeartCheck(this.ws as WebSocket);
this.onmessage(event);
};

this.ws.onerror = (event) => {
this.log('ReconnectingWebSocket', 'onerror', this.url, event);
this.onerror(event as ErrorEvent);
};
}

public send(data: any): void {
if (this.ws) {
this.log('ReconnectingWebSocket', 'send', this.url, data);
return this.ws.send(data);
} else {
throw 'INVALID_STATE_ERR : Pausing to reconnect websocket';
}
}

/**
* Returns boolean, whether websocket was FORCEFULLY closed.
*/
public close(): boolean {
if (this.ws) {
this.forcedClose = true;
this.ws.close();
return true;
}
return false;
}

/**
* Additional public API method to refresh the connection if still open (close, re-open).
* For example, if the app suspects bad data / missed heart beats, it can try to refresh.
*
* Returns boolean, whether websocket was closed.
*/
public refresh(): boolean {
if (this.ws) {
this.ws.close();
return true;
}
return false;
}

private log(...args: any[]): void {
if (this.debug || ReconnectingWebSocket.debugAll) {
console.debug(args);
}
}

private timeoutObj: number = 0;
private serverTimeoutObj: number = 0;
private heartInterval: number = 1000;

/**
* 心跳检测
*/
private startHeartCheck = (ws: WebSocket): void => {
console.log('start');
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);

this.timeoutObj = setTimeout(() => {
//这里发送一个心跳,后端收到后,返回一个心跳消息
ws.send('0x9'); // 这里随便发什么,只要确保后端收到心跳请求能返回数据就行了,这里规定 ping:0x9、pong:0xA
if (this.timedOut) {
const that = this;
this.serverTimeoutObj = setTimeout(() => {
// 如果超时了则关闭 Socket 的连接
that.close();
// createWebSocket();
}, this.timeoutInterval);
}
}, this.heartInterval);
};
}

后端的响应心跳消息

只需要简单的响应这个客户端就行了

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
// 收到心跳检测请求
if ("0x9".equals(message)) {
WebSocketServer item = webSocketMap.get(sid);
try {
item.sendMessage("0xA");
} catch (IOException e) {
e.printStackTrace();
}
}
// do something...
}

完整的代码看 Gist(这里使用的是 spring-boot-starter-websocket):